home *** CD-ROM | disk | FTP | other *** search
- """gadfly server mode
-
- script usage
-
- python gfserve.py port database directory password [startup]
-
- test example
-
- python gfserve.py 2222 test dbtest admin gfstest
-
- port is the port to listen to
- database is the database to start up. (must exist!)
- directory is the directory the database is in.
- password is the administrative access password.
-
- startup if present should be the name of a module to use
- for startup. The Startup module must contain a function
-
- Dict = startup(admin_policy, connection, Server_instance)
-
- which performs any startup actions on the database needed
- and returns either None or a Dictionary of
-
- name > policy objects
-
- where the policy objects describe policies beyond the
- admin policy. The startup function may also
- modify the admin_policy (disabling queries for example).
-
- The arguments passed to startup are:
- admin_policy: the administrative policy
- eg you could turn queries off for admin, using admin
- only for server maintenance, or you could add prepared
- queries to the admin_policy.
- connection: the database connection
- eg you could perform some inserts before server start
- also needed to make policies.
- Server_instance
- Included for additional customization.
-
- Create policies using
- P = gfserve.Policy(name, password, connection, queries=0)
- -- for a "secure" policy with only prepared queries allowed,
- or
- P = gfserve.Policy(name, password, connection, queries=1)
- -- for a policy with full access arbitrary statement
- execution.
-
- add a "named prepared statement" to a policy using
- P[name] = statement
- for example
- P["updatenorm"] = '''
- update frequents
- set bar=?, perweek=?
- where drinker='norm'
- '''
- in this case 'updatenorm' requires 2 dynamic parameters when
- invoked from a client.
-
- Script stdout lists server logging information.
-
- Some server administration services (eg shutdown)
- are implemented by the script interpretion of gfclient.py.
- """
-
- import socket, gadfly
-
- from gfsocket import \
- reply_exception, reply_success, Packet_Reader, certify
-
- def main():
- """start up the server."""
- import sys
- try:
- done = 0
- argv = sys.argv
- nargs = len(argv)
- #print nargs, argv
- if nargs<5:
- sys.stderr.write("gfserve: not enough arguments: %s\n\n" % argv)
- sys.stderr.write(__doc__)
- return
- [port, db, dr, pw] = argv[1:5]
- print "gfserve startup port=%s db=%s, dr=%s password omitted" % (
- port, db, dr)
- from string import atoi
- port = atoi(port)
- startup = None
- if nargs>5:
- startup = argv[5]
- print "gfserve: load startup module %s" % startup
- S = Server(port, db, dr, pw, startup)
- S.init()
- print "gfserve: server initialized, setting stderr=stdout"
- sys.stderr = sys.stdout
- print "gfserve: starting the server"
- S.start()
- done = 1
- finally:
- if not done:
- print __doc__
-
- # general error
- ServerError = "ServerError"
-
- # no such prepared name
- PreparedNameError = "PreparedNameError"
-
- # actions
-
- # shut down the server (admin policy only)
- # arguments = ()
- # shutdown the server with no checkpoint
- SHUTDOWN = "SHUTDOWN"
-
- # restart the server (admin only)
- # arguments = ()
- # restart the server (recover)
- # no checkpoint
- RESTART = "RESTART"
-
- # checkpoint the server (admin only)
- # arguments = ()
- # checkpoint the server
- CHECKPOINT = "CHECKPOINT"
-
- # exec prepared statement
- # arguments = (prepared_name_string, dyn=None)
- # execute the prepared statement with dynamic args.
- # autocommit.
- EXECUTE_PREPARED = "EXECUTE_PREPARED"
-
- # exec any statement (only if not disabled)
- # arguments = (statement_string, dyn=None)
- # execute the statement with dynamic args.
- # autocommit.
- EXECUTE_STATEMENT = "EXECUTE_STATEMENT"
-
- ACTIONS = [SHUTDOWN, RESTART, CHECKPOINT,
- EXECUTE_PREPARED, EXECUTE_STATEMENT]
-
- class Server:
- """database server: listen for commands"""
-
- verbose = 1
-
- # wait X minutes on each server loop
- select_timeout = 60*5
-
- # do a checkpoint each X times thru server loop
- check_loop = 5
-
- # for now works like finger/http
- # == each command is a separate connection.
- # all sql commands constitute separate transactions
- # which are automatically committed upon success.
- # for now commands come in as
- # 1 length (marshalled int)
- # 2 (password, data) (marshalled tuple)
- # responses come back as
- # 1 length (marshalled int)
- # 2 results (marshalled value)
-
- def __init__(self, port, db, dr, pw, startup=None):
- self.port = port
- self.db = db
- self.dr = dr
- self.pw = pw
- self.startup = startup
- self.connection = None
- self.socket = None
- # prepared cursors dictionary.
- self.cursors = {}
- self.policies = {}
- self.admin_policy = None
-
- def start(self):
- """after init, listen for commands."""
- from gfsocket import READY, ERROR, unpack_certified_data
- import sys
- verbose = self.verbose
- socket = self.socket
- connection = self.connection
- policies = self.policies
- admin_policy = self.admin_policy
- from select import select
- pending_connects = {}
- while 1:
- try:
- # main loop
- if self.check_loop<0: self.check_loop=5
- for i in xrange(self.check_loop):
- if verbose:
- print "main loop on", socket, connection
- # checkpoint loop
- sockets = [socket]
- if pending_connects:
- sockets = sockets + pending_connects.keys()
- # wait for availability
- if verbose:
- print "server: waiting for connection(s)"
- (readables, dummy, errors) = select(\
- sockets, [], sockets[:], self.select_timeout)
- if socket in errors:
- raise ServerError, \
- "listening socket in error state: aborting"
- # clean up error connection sockets
- for s in errors:
- del pending_connects[s]
- s.close()
- # get a new connection, if available
- if socket in readables:
- readables.remove(socket)
- (conn, addr) = socket.accept()
- if 1 or verbose:
- print "connect %s" % (addr,)
- reader = Packet_Reader(conn)
- pending_connects[conn] = reader
- # poll readable pending connections, if possible
- for conn in readables:
- reader = pending_connects[conn]
- mode = reader.mode
- if not mode==READY:
- if mode == ERROR:
- # shouldn't happen
- try:
- conn.close()
- del pending_connects[conn]
- except: pass
- continue
- else:
- try:
- reader.poll()
- finally:
- pass # AFTER DEBUG CHANGE THIS!
- # in blocking mode, service ready request,
- # commit on no error
- for conn in pending_connects.keys():
- reader = pending_connects[conn]
- mode = reader.mode
- if mode == ERROR:
- try:
- del pending_connects[conn]
- conn.close()
- except: pass
- elif mode == READY:
- try:
- del pending_connects[conn]
- data = reader.data
- (actor_name, cert, md) = \
- unpack_certified_data(data)
- # find the policy for this actor
- if not policies.has_key(actor_name):
- if verbose:
- print "no such policy: "+actor_name
- reply_exception(NameError,
- "no such policy: "+actor_name, conn)
- policy = None
- else:
- if verbose:
- print "executing for", actor_name
- policy = policies[actor_name]
- policy.action(cert, md, conn)
- except SHUTDOWN:
- if policy is admin_policy:
- print \
- "shutdown on admin policy: terminating"
- connection.close()
- socket.close()
- # NORMAL TERMINATION:
- return
- except RESTART:
- if policy is admin_policy:
- print \
- "restart from admin policy: restarting connection"
- connection.restart()
- except CHECKPOINT:
- if policy is admin_policy:
- print \
- "checkpoint from admin policy: checkpointing now."
- connection.checkpoint()
- except:
- tb = sys.exc_traceback
- info = "%s %s" % (sys.exc_type,
- str(sys.exc_value))
- if verbose:
- from traceback import print_tb
- print_tb(tb)
- print "error in executing action: "+info
- reply_exception(
- ServerError, "exception: "+info, conn)
- #break # stop after first request serviced!
- except:
- # except of main while 1 try statement
- tb = sys.exc_traceback
- ty = sys.exc_type
- va = sys.exc_value
- print "UNEXPECTED EXCEPTION ON MAINLOOP"
- from traceback import print_tb
- print_tb(tb)
- print "exception:", ty, va
- if not pending_connects:
- pending_connects = {}
- print "server: checkpointing"
- connection.checkpoint()
-
- def init(self):
- self.getconnection()
- self.startup_load()
- # get socket last in case of failure earlier
- self.getsocket()
-
-
- HOST = ""
- BACKLOG = 5
-
- def getsocket(self):
- """get the listening socket"""
- verbose = self.verbose
- import socket, sys
- if verbose:
- print "initializing listener socket"
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- if verbose:
- print "trying to set REUSEADDR",\
- sock.getsockopt(socket.SOL_SOCKET,
- socket.SO_REUSEADDR)
- sock.setsockopt(socket.SOL_SOCKET,
- socket.SO_REUSEADDR, 1)
- except:
- if verbose:
- print "set of REUSEADDR failed", sys.exc_type, sys.exc_value
- pass
- sock.bind((self.HOST, self.port))
- sock.listen(self.BACKLOG)
- self.socket = sock
- return sock
-
- def getconnection(self):
- """get the db connection"""
- from gadfly import gadfly
- c = self.connection = gadfly(self.db, self.dr)
- # don't automatically checkpoint upon commit
- c.autocheckpoint = 0
-
- def startup_load(self):
- """setup the policies and load startup module"""
- admin_policy = self.get_admin_policy()
- module_name = self.startup
- if module_name:
- module = __import__(module_name)
- # startup(admin_policy, connection, Server_instance)
- test = module.startup(admin_policy, self.connection, self)
- if test is not None:
- self.policies = test
- self.policies["admin"] = admin_policy
-
- def get_admin_policy(self):
- """return the admin policy for priviledged access."""
- p = self.admin_policy = Policy(
- "admin", self.pw, self.connection, queries=1)
- return p
-
- class Policy:
- """security policy"""
-
- verbose = 0
-
- # allow arbitrary sql statments
- general_queries = 0
-
- # dictionary of named accesses as strings
- named_accesses = None
-
- # dictionary of prepared named accesses
- prepared_cursors = None
-
- def __init__(self, name, password, connection, queries=0):
- """create a policy (name, password, connection)
-
- name is the name of the policy
- password is the access policy (None for no password)
- connection is the database connection.
- set queries to allow general accesses (unrestricted)
- """
- if self.verbose:
- print "policy.__init__", name
- self.general_queries = queries
- self.name = name
- self.password = password
- self.connection = connection
- self.socket = None
- self.named_accesses = {}
- self.prepared_cursors = {}
-
- def __setitem__(self, name, value):
- if self.verbose:
- print "policy", self.name, ":", (name, value)
- from types import StringType
- if type(name) is not StringType or type(value) is not StringType:
- raise ValueError, "cursor names and contents must be strings"
- self.named_accesses[name] = value
-
- def execute_named(self, name, params=None):
- """execute a named (prepared) sql statement"""
- if self.verbose:
- print "policy", self.name, "executes", name, params
- na = self.named_accesses
- pc = self.prepared_cursors
- con = self.connection
- if not na.has_key(name):
- raise PreparedNameError, "unknown access name: %s" % name
- stat = na[name]
- if pc.has_key(name):
- # get prepared query
- cursor = pc[name]
- else:
- # prepare a new cursor
- pc[name] = cursor = con.cursor()
- return self.execute(cursor, stat, params)
-
- def execute(self, cursor, statement, params=None):
- """execute a statement in a cursor"""
- if self.verbose:
- print "policy", self.name, "executes", statement, params
- cursor.execute(statement, params)
- # immediate commit!
- self.connection.commit()
- try:
- result = cursor.fetchall()
- description = cursor.description
- result = (description, result)
- except:
- result = None
- return result
-
- def execute_any_statement(self, statement, params=None):
- """execute any statement."""
- if self.verbose:
- print "policy", self.name, "executes", statement, params
- con = self.connection
- cursor = con.cursor()
- return self.execute(cursor, statement, params)
-
- def action(self, certificate, datastring, socket):
- """perform a database/server action after checking certificate"""
- verbose = self.verbose
- if verbose:
- print "policy", self.name, "action..."
- # make sure the certificate checks out
- if not self.certify(datastring, certificate, self.password):
- raise ServerError, "password certification failure"
- # unpack the datastring
- from marshal import loads
- test = loads(datastring)
- #if verbose:
- #print "data is", test
- (action, moredata) = test
- import sys
- if action in ACTIONS:
- action = "policy_"+action
- myaction = getattr(self, action)
- try:
- data = apply(myaction, moredata+(socket,))
- #self.reply_success(data)
- # pass up server level requests as exceptions
- except SHUTDOWN, detail:
- raise SHUTDOWN, detail
- except RESTART, detail:
- raise RESTART, detail
- except CHECKPOINT, detail:
- raise CHECKPOINT, detail
- except:
- tb = sys.exc_traceback
- exceptiondata = "%s\n%s" %(sys.exc_type,
- str(sys.exc_value))
- if verbose:
- from traceback import print_tb
- print_tb(tb)
- self.reply_exception(ServerError,
- "unexpected exception: "+exceptiondata, socket)
- raise ServerError, exceptiondata
- else:
- raise ServerError, "unknown action: "+`action`
-
- def certify(self, datastring, certificate, password):
- # hook for subclassing
- return certify(datastring, certificate, password)
-
- def policy_SHUTDOWN(self, socket):
- self.reply_success("attempting server shutdown", socket)
- raise SHUTDOWN, "please shut down the server"
-
- def policy_RESTART(self, socket):
- self.reply_success("attempting server restart", socket)
- raise RESTART, "please restart the server"
-
- def policy_CHECKPOINT(self, socket):
- self.reply_success("attempting server checkpoint", socket)
- raise CHECKPOINT, "please checkpoint the server"
-
- def policy_EXECUTE_PREPARED(self, name, dyn, socket):
- try:
- result = self.execute_named(name, dyn)
- self.reply_success(result, socket)
- except PreparedNameError, detail:
- self.reply_exception(PreparedNameError,
- "no such prepared statement: "+name,
- socket)
-
- def policy_EXECUTE_STATEMENT(self, stat, dyn, socket):
- if not self.general_queries:
- self.reply_exception(ServerError,
- "general statements disallowed on this policy",
- socket)
- raise ServerError, "illegal statement attempt for: "+self.name
- result = self.execute_any_statement(stat, dyn)
- self.reply_success(result, socket)
-
- def reply_exception(self, exc, info, socket):
- # hook for subclassing
- reply_exception(exc, info, socket)
-
- def reply_success(self, data, socket):
- # hook for subclassing
- reply_success(data, socket)
-
- if __name__=="__main__": main()
-